package com.crazymaker.l2cache.cluster.level2.canal;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.crazymaker.l2cache.cluster.level2.RocketMQL2Policy;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Map;

@Slf4j
@Data
public class CannalBinlogMessageListener implements MessageListenerConcurrently {

    private final RocketMQL2Policy clusterPolicy;

    public CannalBinlogMessageListener(RocketMQL2Policy clusterPolicy) {
        this.clusterPolicy = clusterPolicy;
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

        for (int i = 0; i < msgs.size(); i++) {
            MessageExt msg = msgs.get(i);
            String json = null;
            try {
                json = new String(msg.getBody(), "utf-8");
                FlatMessage flatMessage = JSON.parseObject(json, FlatMessage.class);
                process(flatMessage);
            } catch (Throwable e) {
                e.printStackTrace();
            }


        }
        // 标记该消息已经被成功消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }


    public void process(FlatMessage flatMessage) {

        if (flatMessage.getIsDdl()) {

            return;
        }

        List<Map<String, String>> dataList = CanalUtil.getCanalDataList(flatMessage);
        if (SQLType.INSERT.equals(flatMessage.getType())) {
            clusterPolicy.processBinLogInsert(
                    flatMessage.getDatabase(), flatMessage.getTable(), dataList);
        } else if (SQLType.UPDATE.equals(flatMessage.getType())) {
            clusterPolicy.processBinLogUPDATE(
                    flatMessage.getDatabase(), flatMessage.getTable(), dataList);
        } else if (SQLType.DELETE.equals(flatMessage.getType())) {
            clusterPolicy.processBinLogDELETE(
                    flatMessage.getDatabase(), flatMessage.getTable(), dataList);
        }


    }


}